package hbasecli.service;

import hbasecli.exception.HBaseDataAccessException;
import hbasecli.model.bytes.CellInfo;
import hbasecli.model.bytes.RowInfo;
import hbasecli.model.bytes.TableFamilyInfo;
import hbasecli.model.bytes.TableInfo;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FuzzyRowFilter;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.*;

public class HBaseService {

    private static final Logger log = LoggerFactory.getLogger(HBaseService.class);

    public List<TableInfo> listTable(
            Connection connection
    ) {
        try (Admin admin = connection.getAdmin()) {
            List<TableDescriptor> tableDescriptors = admin.listTableDescriptors();
            if (tableDescriptors == null) {
                return Collections.emptyList();
            }
            List<TableInfo> tableInfoList = new ArrayList<>();
            for (TableDescriptor td : tableDescriptors) {
                List<TableFamilyInfo> tableFamilyInfoList = new ArrayList<>();
                for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
                    Map<byte[], byte[]> info = new HashMap<>();
                    for (Map.Entry<Bytes, Bytes> entry : cfd.getValues().entrySet()) {
                        info.put(entry.getKey().get(), entry.getValue().get());
                    }
                    TableFamilyInfo tableFamilyInfo = new TableFamilyInfo();
                    tableFamilyInfo.setName(cfd.getName());
                    tableFamilyInfo.setInfo(info);
                    tableFamilyInfoList.add(tableFamilyInfo);
                }
                TableInfo tableInfo = new TableInfo();
                tableInfo.setName(td.getTableName().getNameAsString());
                tableInfo.setFamilies(tableFamilyInfoList);
                tableInfoList.add(tableInfo);
            }
            return tableInfoList;
        } catch (IOException e) {
            log.error("读取表数据失败: ", e);
            throw new HBaseDataAccessException(e);
        }
    }

    public boolean createTable(
            Connection connection,
            String tableName,
            Collection<byte[]> families,
            boolean deleteIfExists
    ) {
        try (Admin admin = connection.getAdmin()) {
            TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(
                    TableName.valueOf(tableName)
            );
            List<ColumnFamilyDescriptor> columnFamilies = new ArrayList<>();
            if (families != null && !families.isEmpty()) {
                for (byte[] family : families) {
                    if (family == null) {
                        continue;
                    }
                    ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(family).build();
                    columnFamilies.add(cfd);
                }
            }
            builder.setColumnFamilies(columnFamilies);
            TableDescriptor table = builder.build();
            if (admin.tableExists(table.getTableName())) {
                if (deleteIfExists) {
                    admin.disableTable(table.getTableName());
                    admin.deleteTable(table.getTableName());
                } else {
                    throw new Exception("表 '" + tableName + "' 已经存在");
                }
            }
            admin.createTable(table);
            return true;
        } catch (Exception e) {
            log.error("新建表失败: ", e);
            throw new HBaseDataAccessException(e);
        }
    }

    public boolean deleteTable(
            Connection connection,
            String tableName
    ) {
        try (Admin admin = connection.getAdmin()) {
            TableName name = TableName.valueOf(tableName);
            if (!admin.tableExists(name)) {
                return true;
            }
            admin.disableTable(name);
            admin.deleteTable(name);
            return true;
        } catch (IOException e) {
            log.error("删除表失败: ", e);
            throw new HBaseDataAccessException(e);
        }
    }

    public boolean deleteTable(
            Connection connection,
            Collection<String> tableNames
    ) {
        for (String tableName : tableNames) {
            if (!deleteTable(connection, tableName)) {
                return false;
            }
        }
        return true;
    }

    public boolean put(
            Connection connection,
            String tableName,
            byte[] rowKey,
            Collection<byte[][]> familyQualifierValues
    ) {
        try (Table table = connection.getTable(TableName.valueOf(tableName))) {
            List<Put> putList = new ArrayList<>();
            for (byte[][] familyQualifierValue : familyQualifierValues) {
                byte[] family = familyQualifierValue[0];
                byte[] qualifier = familyQualifierValue[1];
                byte[] value = familyQualifierValue[2];
                Put put = new Put(rowKey);
                put.addColumn(family, qualifier, value);
                putList.add(put);
            }
            table.put(putList);
            return true;
        } catch (IOException e) {
            log.error("新增数据失败: ", e);
            throw new HBaseDataAccessException(e);
        }
    }

    public List<RowInfo> scan(
            Connection connection,
            String tableName,
            Filter filter,
            byte[] startRow
    ) {
        try (Table table = connection.getTable(TableName.valueOf(tableName))) {
            Scan scan = new Scan();
            if (startRow != null) {
                scan.withStartRow(startRow);
            }
            if (filter != null) {
                scan.setFilter(filter);
            }
            scan.setMaxResultSize(1024);
            List<RowInfo> rowInfoList = new ArrayList<>();
            try (ResultScanner scanner = table.getScanner(scan)) {
                for (Result result : scanner) {
                    RowInfo rowInfo = new RowInfo();
                    List<CellInfo> cellInfoList = new ArrayList<>();
                    for (Cell cell : result.listCells()) {
                        CellInfo cellInfo = new CellInfo();
                        rowInfo.setRowKey(CellUtil.cloneRow(cell));
                        cellInfo.setFamily(CellUtil.cloneFamily(cell));
                        cellInfo.setQualifier(CellUtil.cloneQualifier(cell));
                        cellInfo.setValue(CellUtil.cloneValue(cell));
                        cellInfoList.add(cellInfo);
                    }
                    rowInfo.setCells(cellInfoList);
                    rowInfoList.add(rowInfo);
                }
            } catch (IOException e) {
                log.error("数据读取异常: ", e);
                throw new HBaseDataAccessException(e);
            }
            return rowInfoList;
        } catch (IOException e) {
            log.error("查询失败: ", e);
            throw new HBaseDataAccessException(e);
        }
    }

    @SuppressWarnings("unused")
    public List<RowInfo> scanWithPrefix(
            Connection connection,
            String tableName,
            byte[] rowPrefix,
            byte[] startRow
    ) {
        PrefixFilter filter = new PrefixFilter(rowPrefix);
        return scan(connection, tableName, filter, startRow);
    }

    public List<RowInfo> fuzzyScan(
            Connection connection,
            String tableName,
            byte[] rowKeyPattern,
            char placeholder,
            byte[] startRow
    ) {
        byte[] mask = new byte[rowKeyPattern.length];
        for (int i = 0; i < rowKeyPattern.length; i++) {
            if (rowKeyPattern[i] == placeholder) {
                mask[i] = (byte) 1;
            }
        }
        Pair<byte[], byte[]> pair = new Pair<>(rowKeyPattern, mask);
        FuzzyRowFilter filter = new FuzzyRowFilter(Collections.singletonList(pair));
        return scan(connection, tableName, filter, startRow);
    }

    public boolean deleteRow(
            Connection connection,
            String tableName,
            Collection<byte[]> rowKeys
    ) {
        try (Table table = connection.getTable(TableName.valueOf(tableName))) {
            List<Delete> deleteList = new ArrayList<>();
            for (byte[] rowKey : rowKeys) {
                deleteList.add(new Delete(rowKey));
            }
            table.delete(deleteList);
            return true;
        } catch (IOException e) {
            log.error("数据删除失败: ", e);
            throw new HBaseDataAccessException(e);
        }
    }

    protected void checkConnection(Connection connection) {
        if (connection == null) {
            throw new HBaseDataAccessException("不存在的连接");
        }
        if (connection.isClosed()) {
            throw new HBaseDataAccessException("连接已关闭");
        }
        if (connection.isAborted()) {
            throw new HBaseDataAccessException("连接发生了错误");
        }
    }

}
